package org.jetlinks.pro.clickhouse;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.executor.DefaultColumnWrapperContext;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;

@Slf4j
public enum DefaultFormat implements Format {

    JSON {
        @Override
        public <R> Flux<R> parse(Flux<DataBuffer> result, ResultWrapper<R, ?> wrapper) {

            // FIXME: 2021/5/6 使用流式读取解析数据.
            return DataBufferUtils
                .join(result)
                .flatMapMany(dataBuffer -> Mono
                    .<JSONObject>fromCallable(() -> {
                        try {
                            return com.alibaba.fastjson.JSON.parseObject(dataBuffer.asInputStream(false), JSONObject.class);
                        } catch (Throwable err) {
                            String response = dataBuffer.toString(0, dataBuffer.writePosition(), StandardCharsets.UTF_8);
                            log.warn("parse clickhouse response error, response: {}", response, err);
                            throw err;
                        } finally {
                            DataBufferUtils.release(dataBuffer);
                        }
                    })
                    .flatMapMany(json -> Flux
                        .<R>create(sink -> {
                            JSONArray data = json.getJSONArray("data");
                            for (JSONObject jsonObject : data.toJavaList(JSONObject.class)) {
                                R instance = wrapper.newRowInstance();
                                int index = 0;
                                for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
                                    wrapper.wrapColumn(new DefaultColumnWrapperContext<>(index++,
                                                                                         entry.getKey(),
                                                                                         entry.getValue(),
                                                                                         instance));
                                }
                                sink.next(instance);
                                if (!wrapper.completedWrapRow(instance)) {
                                    sink.complete();
                                    return;
                                }
                            }
                            sink.complete();
                        })))
                .doOnComplete(wrapper::completedWrap);

        }

    }

    // FIXME: 2021/5/6 其他Format解析

    ;

    public static Optional<Format> of(String name) {
        for (DefaultFormat value : values()) {
            if (value.getId().equalsIgnoreCase(name)) {
                return Optional.of(value);
            }
        }

        return Optional.empty();
    }

    @Override
    public @Nonnull
    String getId() {
        return name();
    }
}
